盒子
盒子
文章目录
  1. QEMU源码分析 - 协程
    1. 0x01 协程创建 - qemu_coroutine_create
    2. 0x02 协程进入 - qemu_coroutine_enter
    3. 0x03 协程删除 - coroutine_delete
    4. 0x04 Reference

QEMU源码分析 - 协程

QEMU源码分析 - 协程

审计的版本为6.0.0

先看QEMU中协程结构体,Coroutine为协程的封装结构体,CoroutineUContext为协程的上下文,可以看成是进一步的封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct Coroutine {
CoroutineEntry *entry; //入口函数
void *entry_arg; //入口函数参数
Coroutine *caller; //调用者

/* Only used when the coroutine has terminated. */
QSLIST_ENTRY(Coroutine) pool_next; //协程池链

size_t locks_held;

/* Only used when the coroutine has yielded. */
AioContext *ctx;
};


typedef struct {
Coroutine base; //封装的协程结构体
void *stack;
size_t stack_size; //开辟的栈大小
sigjmp_buf env; //当前上下文环境变量
} CoroutineUContext;

0x01 协程创建 - qemu_coroutine_create

来看创建的函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
Coroutine *qemu_coroutine_create(CoroutineEntry *entry, void *opaque)
{
Coroutine *co = NULL;

if (CONFIG_COROUTINE_POOL) { //判断是否使用协程池
co = QSLIST_FIRST(&alloc_pool); //取出第一个协程结构体
if (!co) {
if (release_pool_size > POOL_BATCH_SIZE) {
//...
}
}
if (co) {
QSLIST_REMOVE_HEAD(&alloc_pool, pool_next);
alloc_pool_size--;
}
}

if (!co) { //没有协程池则新建一个协程,关键函数
co = qemu_coroutine_new();
}

co->entry = entry; //设置协程入口函数
co->entry_arg = opaque; //设置函数参数
QSIMPLEQ_INIT(&co->co_queue_wakeup);
return co;
}



Coroutine *qemu_coroutine_new(void)
{
CoroutineUContext *co;
ucontext_t old_uc, uc;
sigjmp_buf old_env;
union cc_arg arg = {0};
void *fake_stack_save = NULL;

if (getcontext(&uc) == -1) { //初始化结构体,将当前上下文保存到uc
abort();
}

//协程初始化
co = g_malloc0(sizeof(*co));
co->stack_size = COROUTINE_STACK_SIZE;
co->stack = qemu_alloc_stack(&co->stack_size);
co->base.entry_arg = &old_env; //设置old_env为后续入口参数

uc.uc_link = &old_uc;
uc.uc_stack.ss_sp = co->stack;
uc.uc_stack.ss_size = co->stack_size;
uc.uc_stack.ss_flags = 0;

arg.p = co;

on_new_fiber(co);
makecontext(&uc, (void (*)(void))coroutine_trampoline,
2, arg.i[0], arg.i[1]); //制造一个上下文,设置入口函数

if (!sigsetjmp(old_env, 0)) { //保存当前上下文到old_env
swapcontext(&old_uc, &uc); //保存当前上下文到old_uc,切换上下文到uc中去
}

return &co->base;
}

swapcontext(&old_uc, &uc)函数会跳转上下文到uc中,前面已经制造了一个uc的上下文并设置了入口函数,因此,会跳转到coroutine_trampoline函数中去,函数参数为co协程。详细情况看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void coroutine_trampoline(int i0, int i1)
{
union cc_arg arg;
CoroutineUContext *self;
Coroutine *co;
void *fake_stack_save = NULL;

finish_switch_fiber(NULL);

arg.i[0] = i0;
arg.i[1] = i1;
self = arg.p; //实际为前面提到的co协程
co = &self->base; //也就是co->base

if (!sigsetjmp(self->env, 0)) { //当前上下文保存到self->env
siglongjmp(*(sigjmp_buf *)co->entry_arg, 1); //跳转到co->entry_arg上下文去,并返回1
}

while (true) {
co->entry(co->entry_arg);
qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
}
}

这里后续又调回到co->entry_arg中了,在前面可以得知他被赋值为了&old_env,也就是跳转回了qemu_coroutine_create函数中的if (!sigsetjmp(old_env, 0))这一行,因为返回值为1,因此跳出if判断。最终返回了&co->base协程结构体。

0x02 协程进入 - qemu_coroutine_enter

具体情况看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void qemu_coroutine_enter(Coroutine *co)
{
qemu_aio_coroutine_enter(qemu_get_current_aio_context(), co);
}

void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
{
QSIMPLEQ_HEAD(, Coroutine) pending = QSIMPLEQ_HEAD_INITIALIZER(pending);
Coroutine *from = qemu_coroutine_self(); //获取当前协程的上下文

QSIMPLEQ_INSERT_TAIL(&pending, co, co_queue_next);

/* Run co and any queued coroutines */
while (!QSIMPLEQ_EMPTY(&pending)) {
Coroutine *to = QSIMPLEQ_FIRST(&pending); //获取传入的co协程
CoroutineAction ret;

QSIMPLEQ_REMOVE_HEAD(&pending, co_queue_next);

to->caller = from; //设置to协程的调用者为from(方便后续返回)
to->ctx = ctx;

smp_wmb();

ret = qemu_coroutine_switch(from, to, COROUTINE_ENTER); //切换协程,关键函数

QSIMPLEQ_PREPEND(&pending, &to->co_queue_wakeup);

switch (ret) {
case COROUTINE_YIELD:
break;
case COROUTINE_TERMINATE:
assert(!to->locks_held);
trace_qemu_coroutine_terminate(to);
coroutine_delete(to);
break;
default:
abort();
}
}
}

可以看到函数设置了两个协程,一个保存当前函数的上下文环境,一个保存用户创建的协程环境。并将两者通过caller连接起来,最终切换协程。切换协程看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CoroutineAction __attribute__((noinline))
qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
CoroutineAction action)
{
CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_);
CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_);
int ret;
void *fake_stack_save = NULL;

current = to_; //将用户协程赋值给全局变量current

ret = sigsetjmp(from->env, 0); //保存当前上下文到from->env中(协程进入函数的协程)
if (ret == 0) {
siglongjmp(to->env, action); //跳转到to->env上下文中去,返回COROUTINE_ENTER = 3
}

finish_switch_fiber(fake_stack_save);

return ret;
}

这里跳转的情况从先前创建协程函数可以知道是在coroutine_trampoline函数中的if (!sigsetjmp(self->env, 0)) {这里。因此跳转过去,返回值为COROUTINE_ENTER。往下看执行情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static void coroutine_trampoline(int i0, int i1)
{
while (true) {
co->entry(co->entry_arg); //执行entry入口函数,也就是用户创建的时候自定义的函数
qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE); //切换线程
}
}


CoroutineAction __attribute__((noinline))
qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
CoroutineAction action)
{
CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_); //用户协程co
CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_); //协程进入函数的协程
int ret;
void *fake_stack_save = NULL;

current = to_;

ret = sigsetjmp(from->env, 0);
if (ret == 0) {
siglongjmp(to->env, action); //跳转到协程进入函数的env中,返回COROUTINE_TERMINATE = 2
}

finish_switch_fiber(fake_stack_save);

return ret;
}

这里有点绕,仔细梳理一下就知道是跳回到第一次执行qemu_coroutine_switch函数的ret = sigsetjmp(from->env, 0);这里。根据返回值返回到qemu_aio_coroutine_enter函数的ret = qemu_coroutine_switch(from, to, COROUTINE_ENTER);这里,看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co)
{
ret = qemu_coroutine_switch(from, to, COROUTINE_ENTER); //切换协程,关键函数

QSIMPLEQ_PREPEND(&pending, &to->co_queue_wakeup);

switch (ret) { //ret = COROUTINE_TERMINATE
case COROUTINE_YIELD:
break;
case COROUTINE_TERMINATE:
assert(!to->locks_held);
trace_qemu_coroutine_terminate(to);
coroutine_delete(to); //删除用户定义的协程
break;
default:
abort();
}
}
}

0x03 协程删除 - coroutine_delete

看上面后续最终执行的删除协程函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void coroutine_delete(Coroutine *co)
{
co->caller = NULL; //用户定义的协程co,删除调用者caller

qemu_coroutine_delete(co);
}


void qemu_coroutine_delete(Coroutine *co_)
{
CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_);

g_free(co); //free协程结构体
}

到这里,协程整个从创建、进入、删除的周期就叙述完了,进入的部分有一些绕,需要仔细阅读,其余的部分作用上来说我觉得和线程还是有区别的,协程是一种异步状态,而线程是一种同步状态。

QEMU里结合了setjmp/longjmpucontext两种协程方式的编程方式还是比较厉害的。

0x04 Reference

  1. https://royhunter.github.io/2016/06/24/qemu-coroutine/
  2. https://www.cnblogs.com/VincentXu/p/3350389.html
  3. https://blog.csdn.net/LPSTC123/article/details/45009819
支持一下
扫一扫,支持v1nke
  • 微信扫一扫
  • 支付宝扫一扫